/*
 * Copyright (C) 2017-2019 Dremio Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.dremio.exec.store.deltalake;

import java.util.Collections;
import java.util.List;
import java.util.ListIterator;

import com.dremio.common.exceptions.UserException;
import com.google.common.base.Preconditions;

/**
 * This class is responsible for running validations and filtering snapshots generated by {@link DeltaMetadataFetchJobManager}.
 * A consolidated snapshot with all the info can be generated from the final filtered list
 *
 * Final list returned by createAndValidateFinalList method will be of the form
 *  if version != 0
 *    last.json, (last - 1).json ... start.checkpoint.parquet
 *  else
 *    last.json, (last - 1).json, ...   0000.json
 *
 *  Note that in the final list at most one checkpoint parquet is allowed and that should only be present
 *  if 00000.json is not present input list of snapshots.
 */

public class DeltaSnapshotListProcessor {

  public DeltaLogSnapshot consolidateSnapshots(List<DeltaLogSnapshot> snapshots) {
    Preconditions.checkState(!snapshots.isEmpty(), "No snapshot available for consolidation");
    DeltaLogSnapshot consolidatedSnapshot = new DeltaLogSnapshot("MERGE", 0, 0, 0, 0, Long.MIN_VALUE, false);

    snapshots.stream().forEach(consolidatedSnapshot::merge);
    return consolidatedSnapshot;
  }

  //Ensures that the current list is valid and creates a final list of snapshots.
  public List<DeltaLogSnapshot> findValidSnapshots(List<DeltaLogSnapshot> snapshots) {
    Collections.sort(snapshots, Collections.reverseOrder());

    ListIterator<DeltaLogSnapshot> iterator = snapshots.listIterator();

    //Move back till you don't find a checkpoint DeltaLogSnapshot
    long lastVersion = -1;
    boolean containsCheckpoint  = false;

    while (iterator.hasNext()) {
      DeltaLogSnapshot snapshot = iterator.next();

      if(lastVersion != -1 && snapshot.getVersionId() != lastVersion) {
        throw UserException.invalidMetadataError()
          .message("Missing version file " + lastVersion)
          .buildSilently();
      }

      lastVersion = lastVersion != -1 ? --lastVersion : snapshot.getVersionId() - 1;

      if(snapshot.containsCheckpoint()) {
        snapshots.subList(iterator.nextIndex(), snapshots.size()).clear();
        containsCheckpoint = true;
        break;
      }

    }

    Preconditions.checkState(containsCheckpoint || snapshots.get(snapshots.size() - 1).getVersionId() == 0, "Commit Json for version 0 not found while reading metadata");
    return snapshots;
  }

}
